


/*
file
clickhouse
elasticsearch
*/

use crate::conf_init::{MapErr, sink_conf, MapResult, normal_conf};
use crate::data_frame::{DataInstance};
use async_trait::async_trait;
#[cfg(feature="sink_clickhouse")]
use crate::sink::clickhouse::ClickhouseConf;
use crate::sink::file::{FileConf, FileOut};
use crate::sink::std_out::{StdoutConf, StdOut};
use crate::util::{AsyncRecvData, SyncRecv, SyncSender, channel_new};
use std::collections::HashMap;
use tokio::runtime::Runtime;
use std::sync::{Arc};
use crate::logical_plan::work_node::{work_node_add_next, work_node_add_next_online,work_node_get_work_space_name};
use std::fmt::Debug;
use crate::indicators::{indicators_key_ref, Op, IndicatorsMode, sink_input_register};
use std::time::Duration;
use crate::data_frame::single_data::SingleData;
use crate::sink::clickhouse::ClickHouse;
use tokio::sync::RwLock as TokioRwLock;
use tokio::sync::RwLockReadGuard as TokioRwLockReadGuard;
use tokio::sync::RwLockWriteGuard as TokioRwLockWriteGuard;
use once_cell::sync::OnceCell;
use linked_hash_map::LinkedHashMap;

#[cfg(feature="sink_clickhouse")]
mod clickhouse;
mod es;
mod file;
mod std_out;

#[derive(Deserialize,Debug)]
pub enum SinkWay{
    File(FileConf),
    #[cfg(feature="sink_clickhouse")]
    ClickHouse(ClickhouseConf),
    Stdout(StdoutConf),
}

#[derive(Deserialize,Debug)]
pub struct SinkConf{
    pub name: String,
    enable: bool,
    input: Vec<String>,
    threads: usize,
    concurrent: usize,
    burst: usize,
    way: SinkWay,
}
#[derive(Debug)]
pub struct SinkInstance{
    name: String,
    runtime: Arc<Runtime>,
    ctl_recv: SyncRecv<SinkMsg>,
    input: SinkInput,
    sink: Box<dyn Sink>,
    burst: usize,
    concurrent: usize,
}
#[derive(Debug)]
pub struct SinkCtl{
    pub name: String,
    pub enable: bool,
    runtime: Arc<Runtime>,
    ctl_send: SyncSender<SinkMsg>,
    input: SinkInput,
    sink: Option<SinkInstance>,
}
#[derive(Clone,Debug)]
pub struct SinkInput{
    input: Arc<TokioRwLock<LinkedHashMap<String, AsyncRecvData>>>,
}
impl SinkInput{
    fn new()->SinkInput{
        SinkInput{
            input: Arc::new(TokioRwLock::new(LinkedHashMap::new())),
        }
    }
    async fn show_input(&self){
        let input = self.input.read().await;
        for (input,_) in input.iter(){
            info!("input [{}]", input);
        }
    }
    async fn check_input(&self, input_name: &str) -> bool{
        let input = self.input.read().await;
        if let Some(_) = input.get(input_name) {
            return true;
        }
        return false;
    }
    async fn add_input(&self, input_name: &str, r: AsyncRecvData)->MapResult<()>{
        let mut input = self.input.write().await;

        if let Some(_) = input.get(input_name){
            return MapResult::Err(MapErr::new(format!("input [{}] already exist", input_name)));
        }

        sink_input_register(input_name, r.clone()).await;
        input.insert(input_name.to_string(), r);
        return MapResult::Ok(());
    }
    async fn try_recv(&self, name: &str)->Vec<Vec<SinkData>>{

        let mut store_data = Vec::new();

        let input = self.input.read().await;

        /*
            从多个input收一大批包，对于sink的input，先简单定一个约束：
                只收BatchData类型的数据，如果有其他类型的数据，需要前置转换
        */

        for (_, input) in input.iter() {
            if let Some(data) = input.try_recv().await {
                match data{
                    DataInstance::GroupData(group_data) => {
                        indicators_key_ref(Op::Add, IndicatorsMode::Sink, name, "drop", group_data.len());
                    },
                    DataInstance::BatchData(batch_data) => {
                        indicators_key_ref(Op::Add, IndicatorsMode::Sink, name, "recv", batch_data.len());
                        store_data.push(batch_data);
                    },
                }
            }
        }

        return store_data;
    }
    async fn input_register(&self){

        let input_list = self.input.read().await;
        for (input_name, input) in input_list.iter(){
            sink_input_register(input_name, input.clone()).await;
        }
    }
}
#[derive(Debug)]
enum SinkMsg{
    AddInput(String, AsyncRecvData),
}

static SINK_LIST: OnceCell<TokioRwLock<HashMap<String, SinkCtl>>> = OnceCell::new();

pub async fn sink_list_read<'a>()->TokioRwLockReadGuard<'a, HashMap<String, SinkCtl>>{
    let tmp = SINK_LIST.get();

    let tmp = if let Some(tmp) = tmp{
        tmp
    }else{
        let value = match sink_init_from_conf().await{
            MapResult::Ok(t) => {t},
            MapResult::Err(e) => {
                error!("sink_init_from_conf err: {:?}", e);
                std::process::exit(-1);
            },
        };
        SINK_LIST.get_or_init(||{value})
    };

    tmp.read().await
}

pub async fn sink_list_write<'a>()->TokioRwLockWriteGuard<'a, HashMap<String, SinkCtl>>{
    let tmp = SINK_LIST.get();

    let tmp = if let Some(tmp) = tmp{
        tmp
    }else{
        let value = match sink_init_from_conf().await{
            MapResult::Ok(t) => {t},
            MapResult::Err(e) => {
                error!("sink_init_from_conf err: {:?}", e);
                std::process::exit(-1);
            },
        };
        SINK_LIST.get_or_init(||{value})
    };

    tmp.write().await
}

async fn sink_init_from_conf()->MapResult<TokioRwLock<HashMap<String, SinkCtl>>>{

    info!("sink_init_from_conf");

    let sink_conf = sink_conf();
    let mut sink_ctl_list = HashMap::new();

    if let Some(sink_conf) = sink_conf{
        for sink_conf in sink_conf.iter(){
            if !sink_conf.enable{
                continue;
            }

            let sink_ctl = sink_init_from_conf_one(sink_conf).await?;

            sink_ctl_list.insert(sink_conf.name.clone(), sink_ctl);
        }
    }

    return MapResult::Ok(TokioRwLock::new(sink_ctl_list));
}

pub async fn sink_register_online(sink_conf: &SinkConf)->MapResult<()>{
    {
        let t =  sink_list_read().await;
        if let Some(_) = t.get(&sink_conf.name){
            return MapResult::Err(MapErr::new(format!("Sink name is regestered")));
        }
    }

    let mut sink_ctl = sink_init_from_conf_one(sink_conf).await?;

    sink_run_one(&mut sink_ctl)?;

    let mut sink_ctl_list = sink_list_write().await;
    sink_ctl_list.insert(sink_conf.name.clone(), sink_ctl);
    return MapResult::Ok(());
}

pub async fn sink_add_input_online(sink_name: &str, input_name: &str)->MapResult<()>{

    let sink_ctl_list = sink_list_read().await;

    if let Some(sink) = sink_ctl_list.get(sink_name){

        if let Some(a) = work_node_get_work_space_name(input_name).await {
            if sink.input.check_input(&a).await == true {
                return MapResult::Err(MapErr::new(format!("input name is existed {}", input_name)));
            }
        }

        let work_node = work_node_add_next_online(input_name, sink_name).await?;
        if let Some(work_node) = work_node{
            sink.input.add_input(work_node.0.as_str(), work_node.1).await?;
            return MapResult::Ok(());
        }else{
            return MapResult::Err(MapErr::new(format!("sink [{}] input [{}] not enable", sink_name, input_name)));
        }
    }else{
        return MapResult::Err(MapErr::new(format!("sink [{}] not exist", sink_name)));
    }
}

async fn sink_init_from_conf_one(sink_conf: &SinkConf)->MapResult<SinkCtl>{

    let sink_runtime = match tokio::runtime::Builder::new_multi_thread()
        .thread_name(format!("sink_{}", sink_conf.name))
        .worker_threads(sink_conf.threads)
        .max_blocking_threads(1)
        .enable_all()
        .build(){
        Ok(t) => {Arc::new(t)},
        Err(e) => {
            return MapResult::Err(MapErr::new(format!("sink {} build runtime failed {:?}", sink_conf.name, e)));
        },
    };

    let input_list = SinkInput::new();
    for input in sink_conf.input.iter(){
        info!("sink [{}] input [{}]", sink_conf.name, input);
        let sink_input = work_node_add_next(input, sink_conf.name.as_str()).await?;
        if let Some(sink_input) = sink_input{
            input_list.add_input(input, sink_input.1).await?;
        }else{
            info!("sink [{}] input work_node [{}] not enable", sink_conf.name, input);
        }
    }

    let (ctl_send,ctl_recv) = channel_new();

    let input_instance = input_list.clone();
    let sink_instance = SinkInstance{
        name: sink_conf.name.clone(),
        runtime: Arc::clone(&sink_runtime),
        ctl_recv: ctl_recv,
        input: input_instance,
        sink: sink_init(&sink_conf)?,
        burst: sink_conf.burst,
        concurrent: sink_conf.concurrent,
    };

    let sink_ctl = SinkCtl{
        name: sink_conf.name.clone(),
        enable: sink_conf.enable,
        runtime: Arc::clone(&sink_runtime),
        input: input_list,
        ctl_send: ctl_send,
        sink: Some(sink_instance),
    };

    return MapResult::Ok(sink_ctl);
}

fn sink_init(sink_conf: &SinkConf)->MapResult<Box<dyn Sink>>{
    match &sink_conf.way{
        SinkWay::File(file_out_conf) => {FileOut::new(file_out_conf)},
        SinkWay::Stdout(_std_out_conf) => {StdOut::new()},
        #[cfg(feature="sink_clickhouse")]
        SinkWay::ClickHouse(ck_conf) => {ClickHouse::new(sink_conf.name.as_str(), ck_conf)},
    }
}

pub async fn sink_run()->MapResult<()>{
    info!("sink_run");

    let mut sink_ctl_list = sink_list_write().await;

    for (sink_name, sink_ctl) in sink_ctl_list.iter_mut(){
        sink_run_one(sink_ctl)?;
    }

    return MapResult::Ok(());
}

fn sink_run_one(sink_ctl: &mut SinkCtl)->MapResult<()>{

    if sink_ctl.enable{
        info!("sink [{}] is enable", sink_ctl.name);
        if let Some(sink) = sink_ctl.sink.take(){
            sink_ctl.runtime.spawn(
                sink_input(sink)
            );
            return MapResult::Ok(());
        }else{
            let desc = format!("sink {} take is None", sink_ctl.name);
            return MapResult::Err(MapErr::new(desc));
        }
    }else{
        info!("sink [{}] not enable", sink_ctl.name);
        return MapResult::Ok(());
    }
}

async fn sink_input(mut sink: SinkInstance){

    sink.input.input_register().await;
    let normal_conf = normal_conf();
    tokio::time::sleep(Duration::from_secs(120)).await;

    loop {
        loop {
            if sink.sink.init().await {
                debug!("sink {} init ok", sink.name);
                indicators_key_ref(Op::Add, IndicatorsMode::Sink, &sink.name, "init ok", 1);
                break;
            } else {
                debug!("sink {} init fail", sink.name);
                indicators_key_ref(Op::Add, IndicatorsMode::Sink, &sink.name, "init err", 1);
                std::thread::sleep(Duration::from_secs(1));
            }
        }

        loop {
            let store_data = sink.input.try_recv(&sink.name).await;
            if store_data.len() > 0{
                match sink.sink.store(&sink.runtime, store_data, sink.burst, sink.concurrent).await{
                    SinkResult::Ok(_) => {},
                    SinkResult::Err(_e) => {
                        break;
                    },
                }
            }else{
                tokio::time::sleep(Duration::from_millis(normal_conf.sleep_while_null)).await;
            }
        }
    }
}

pub type SinkResult = Result<(), ()>;

pub type SinkData = SingleData;

#[async_trait]
pub trait Sink: Send+Sync+Debug{
    async fn init(&mut self)->bool;
    async fn store(&mut self, runtime: &Runtime, data: Vec<Vec<SinkData>>, burst: usize, concurrent: usize)->SinkResult;
}



